package pub

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"github.com/spf13/cobra"
	"sync"
	"time"
)

var PD *Publish

type Publish struct {
	Ip string
	// 模拟线程连接数
	ConnNum int
	// 每个数据包大小 /mb
	EveryPack int
	// 两次发送时间间隔 /s
	Dur int
	// 每个线程发送的消息条数
	Total int
	Topic string
}

var Pub = &cobra.Command{
	Use:   "pub",
	Short: "发布",
	Example: `
	# 开启3个tcp客户端，每条消息包大小100kb， 时间间隔为1毫秒, 每个tcp最多发300条结束, topic: qtmd
	mqctl pub -c 3 -e 100 -d 1 -t 300 -i 127.0.0.1:1883 -o qtmd
`,
	Run: func(cmd *cobra.Command, args []string) {
		var wg sync.WaitGroup
		for i := 0; i < PD.ConnNum; i++ {
			wg.Add(1)
			go work(PD.EveryPack, PD.Dur, PD.EveryPack, PD.Ip, &wg)
		}

		wg.Wait()
		fmt.Println("消息发送总数: ", PD.Total*PD.ConnNum, " 完毕")
	},
}

func init() {
	PD = &Publish{}

	Pub.Flags().IntVarP(&PD.ConnNum, "cn", "c", 0, "mock num of tcp conn")
	Pub.Flags().IntVarP(&PD.EveryPack, "ep", "e", 0, "mock size of  every tcp pack -> kb")
	Pub.Flags().IntVarP(&PD.Dur, "dur", "d", 0, "mock time dur -> ms")
	Pub.Flags().IntVarP(&PD.Total, "total", "t", 0, "每个线程发送的消息条数")
	Pub.Flags().StringVarP(&PD.Ip, "ip", "i", "", "访问的ip")
	Pub.Flags().StringVarP(&PD.Topic, "topic", "o", "", "topic")
}

func work(count, dur, everyPackSize int, ip string, wg *sync.WaitGroup) {
	opts := mqtt.NewClientOptions().AddBroker("tcp://" + ip)

	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	b := make([]byte, 0)
	for i := 0; i < 1024*everyPackSize; i++ {
		b = append(b, 255)
	}
	for i := 0; i < PD.Total; i++ {
		go func() {
			token := c.Publish(PD.Topic, 0, false, b)
			token.Wait()
		}()
		time.Sleep(time.Duration(PD.Dur) * time.Millisecond)
	}

	time.Sleep(time.Second * 30)
	wg.Done()
}
